-
Notifications
You must be signed in to change notification settings - Fork 277
feat(pyth-lazer-agent) Allow deduplicating updates within each batch #2944
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
relayer_urls = ["wss://relayer.pyth-lazer-staging.dourolabs.app/v1/transaction", "wss://relayer-1.pyth-lazer-staging.dourolabs.app/v1/transaction"] | ||
publish_keypair_path = "/path/to/solana/id.json" | ||
relayer_urls = ["ws://localhost:10001/v1/transaction"] | ||
publish_keypair_path = "/tmp/keypair.json" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in future we should have keys in the repo for local testing.
let mut deduped_feed_updates = Vec::new(); | ||
let mut last_feed_update = HashMap::new(); | ||
|
||
// assume that feed_updates is already sorted by ts (within feed_update_id groups) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// assume that feed_updates is already sorted by ts (within feed_update_id groups) | |
// assume that feed_updates is already sorted by timestamp for each feed_update.feed_id |
if let Some(update) = feed_update.update.as_ref() { | ||
if let Some(last_update) = last_feed_update.get(&feed_id) { | ||
if update == last_update { | ||
continue; | ||
} | ||
} | ||
|
||
deduped_feed_updates.push(feed_update.clone()); | ||
last_feed_update.insert(feed_id, update.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are keeping the update with the lowest timestamp, shouldn't we keep the highest timestamp?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would insert all in the map and collect it to Vec at the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also use dedupe_by_key on the std vec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we keep the highest timestamp?
I don't think we should. My reasoning was that we generally care about lowest-latency so it's more fair to keep the first value we've seen and remove consecutive duplicates - that way we capture the 'earliest' timestamp in the batch (may not affect quality metrics but it's still a better representation of what the publisher does).
In contrast keeping "highest" timestamp is kind of like a "most recently seen" cache. If we do that, is there even a point in recording intra-batch history instead f just returning last seen?
What's your rationale for keeping the last rather than first? What's the benefit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that this is intended for real-time consumption, I'd think we want to reflect the latest timestamp that this data is accurate as-of, but I'm not entirely clear. If so, yeah, for a single publisher stream, why not just send the most recently seen per batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just send the most recently seen per batch.
I don't want to censor data as much as I don't want to invent it. I'm OK with removing truly duplicate values but I'm not comfortable deleting things that actually contain information.
Maybe we should just do a simple deduplication on (feed_id, source_timestamp)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO the data is accurate "since" the first time it was seen, "until" the end of the aggregate window (at which point the publisher needs to retransmit unchanged-entries) or, notionally, until they send a changed value (for which we also need to know the earliest occurance). Knowing what the "most recent" occurrence of a value within an agg window doesn't give us any useful information. Knowing the "first time" the value showed up in a batch let's us better understand the timing characteristics of each publisher
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I was suggesting the latest timestamp was because of price expiry. When we do lowest timestamp, the price will expire faster than it should if they don't send another update. However, I think you are right about keeping the information about the earliest time a publisher sent us a new price is more important. Let's keep the lowest timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should keep the latest timestamp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with your conclusions on earliest vs latest in the batch. You've got some small things to fix but overall LGTM!
@@ -299,6 +299,7 @@ pub mod tests { | |||
publish_keypair_path: Default::default(), | |||
publish_interval_duration: Default::default(), | |||
history_service_url: None, | |||
enable_update_deduplication: false, | |||
}; | |||
|
|||
println!("{:?}", get_metadata(config).await.unwrap()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason this is a println and not an info log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't have tracing setup properly in unit tests making println the easiest choice. this is just for that one manual test that's only ever run locally during dev to grab example data
Summary
Add an option to deduplicate updates within each of the batches before sending them over to lazer. The dedup logic will keep all distinct updates only removing any consecutive duplicates.
Rationale
Reduce pressure on the relayers for publishers that emit updates without any changes at frequency higher than the publish interval
How has this been tested?